package com.gfscold.trans.common.deserialize;
import com.gfscold.trans.common.constant.CdcType;
import org.apache.kafka.connect.data.Struct;

import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.source.SourceRecord;

//自定义序列化器
public class MySQLCDCSourceDeserialize implements DebeziumDeserializationSchema<CdcType> {
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<CdcType> collector) throws Exception {
        Struct struct = (Struct) sourceRecord.value();
        String beforeId = struct.getStruct("before").getString("id");
        String before = struct.getString("before");
        String after = struct.getString("after");
        Struct source = struct.getStruct("source");
        String db = source.getString("db");
        String table = source.getString("table");
        String op = struct.getString("op");
        String ts_ms = struct.getString("ts_ms");
        String transaction = struct.getString("transaction");
        CdcType builder = CdcType.builder()
                .beforeId(beforeId)
                .before(before)
                .after(after)
                .db(db)
                .table(table)
                .op(op)
                .ts_ms(ts_ms)
                .transaction(transaction)
                .build();
        collector.collect(builder);
    }

    @Override
    public TypeInformation<CdcType> getProducedType() {
        return TypeInformation.of(CdcType.class);
    }
}
